Appearance
Stream 组件是用于封装消息队列来使用的,Bus这个组件一般会结合 Stream 使用;
看一般性文档解释,主要是围绕 Bus 可以做到配置的动态刷新、事件的传递。具体使用的时候可以看一下相关内容。
下面内容是引用 ChatGpt 的内容,分别是 Stream 和 Bus 的基本使用。
1、Spring Cloud Stream的使用
Spring Cloud Stream 是一个用于构建消息驱动微服务架构的框架,它基于Spring Boot和Spring Integration,提供了一种简单的方式来实现各个微服务组件之间的消息通信。
以下是Spring Cloud Stream的基本使用步骤:
- 引入依赖
在项目的 pom.xml 文件中,添加 Spring Cloud Stream 的依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-{binder}</artifactId>
</dependency>{binder} 可以是不同的消息中间件的名称,比如 RabbitMQ、Kafka 等。
- 配置消息中间件
在 application.properties 或 application.yml 文件中配置消息中间件的连接信息,以 RabbitMQ 为例:
spring:
cloud:
stream:
bindings:
output:
destination: my-output-topic
binder: rabbit # RabbitMQ的binder
rabbit:
bindings:
output:
exchangeType: topic
routingKeyExpression: 'my-output-topic'
- 发送消息
在应用程序中使用 Source 接口定义消息发送的输出通道:
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface MySource {
String OUTPUT = "output";
@Output(OUTPUT)
MessageChannel output();
}然后在需要发送消息的地方注入 MySource,并使用 output() 方法发送消息:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class MessageSenderService {
@Autowired
private MySource mySource;
public void sendMessage(String payload) {
Message<String> message = MessageBuilder.withPayload(payload).build();
mySource.output().send(message);
}
}
- 接收消息
在应用程序中使用 @StreamListener 注解定义消息接收的输入通道:
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
@Service
public class MessageReceiverService {
@StreamListener(MySource.INPUT)
public void receiveMessage(@Payload String payload) {
System.out.println("Received message: " + payload);
// 处理接收到的消息
}
}
- 运行时绑定
Spring Cloud Stream 会在运行时将消息通道绑定到消息中间件上,使得消息的发送和接收可以与底层的消息中间件解耦。根据配置的 {binder} 不同,可以支持 RabbitMQ、Kafka、ActiveMQ 等多种消息中间件。
通过以上步骤,你就可以使用 Spring Cloud Stream 在微服务中实现消息的发送和接收,从而构建消息驱动的微服务架构。
2、SpringCloud Bus 的使用
Spring Cloud Bus是一个用于在分布式系统中传播事件的工具,它建立在Spring Cloud Stream之上。它允许微服务架构中的组件之间进行消息推送,实现配置的动态刷新、事件的传递等功能。
以下是Spring Cloud Bus的基本使用步骤:
- 引入依赖
在项目的pom.xml文件中添加Spring Cloud Bus的依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>上述示例中使用了AMQP(Advanced Message Queuing Protocol)作为消息传递的中间件,你也可以选择其他消息中间件,如Kafka等。
- 配置消息中间件
在application.properties或application.yml中配置消息中间件的连接信息:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
- 配置Bus
在微服务的application.properties或application.yml中,添加以下配置:
spring:
cloud:
bus:
enabled: true4.发送消息
在任意微服务中,可以使用Spring Cloud Bus发送消息来触发事件。可以使用HTTP POST请求向/actuator/bus-refresh端点发送刷新请求:
curl -X POST http://localhost:8080/actuator/bus-refresh这样就会触发Spring Cloud Bus发送一个刷新事件,然后各个微服务都会接收到这个事件,从而触发配置的刷新操作。
- 接收消息
在微服务中,可以使用@RefreshScope注解标记一个类,使得该类中的配置在接收到刷新事件时能够重新加载:
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RefreshScope
public class MyController {
@Value("${my.property}")
private String myProperty;
@GetMapping("/property")
public String getProperty() {
return myProperty;
}
}上述示例中,@RefreshScope注解使得MyController类中的配置在接收到刷新事件时会重新加载。
通过这些步骤,你就可以使用Spring Cloud Bus实现在分布式系统中传播事件,例如动态刷新配置、传递消息等。
to be contined.....